#include "RtmpPuller.h"


RtmpPuller::RtmpPuller()
	:mFrameIndex(0), mAudioIndex(-1),mVideoIndex(-1)
	, mAudioStream(nullptr),mVideoStream(nullptr)
{
	av_register_all();
	//Network
	avformat_network_init();
	//Input
}

int RtmpPuller::ConnectServer(const char *p)
{
	int ret = 0;
	if ((ret = avformat_open_input(&mIfmtCtx, p, 0, 0)) < 0) {
		printf("Could not open input file.");
		return -1;
	}
	if ((ret = avformat_find_stream_info(mIfmtCtx, 0)) < 0) {
		printf("Failed to retrieve input stream information");
		return -1;
	}
	for (int i = 0; i < mIfmtCtx->nb_streams; i++) {
		if (mIfmtCtx->streams[i]->codec->codec_type == AVMEDIA_TYPE_VIDEO) {
			mVideoIndex = i;
		}
		if (mIfmtCtx->streams[i]->codec->codec_type == AVMEDIA_TYPE_AUDIO) {
			mAudioIndex = i;
		}
	}
	if(mAudioIndex > -1)
		this->mAudioStream = mIfmtCtx->streams[mAudioIndex];
	if(mVideoIndex > -1)
		this->mVideoStream = mIfmtCtx->streams[mVideoIndex];
	av_dump_format(mIfmtCtx, 0, p, 0);
	mH264bsfc = av_bitstream_filter_init("h264_mp4toannexb");
	mStatus = RUNNING;
	if((mAudioIndex == -1 ) &&(mVideoIndex == -1))
		mStatus = NOSOURCE;
	return 0;
}

int ThreadPull(RtmpPuller*p) {
	while (p->Status() == RtmpPuller::CAP_STATUS::RUNNING) {
		p->PullData();
	}
	return 0;
}

int RtmpPuller::StartPull()
{
	this->mThread = new std::thread(ThreadPull, this);
	this->mThread->get_id();
	mStatus = RUNNING;
	return 0;
}

int RtmpPuller::PullData()
{
	static int drop = 0;
	AVStream *in_stream;
	//Get an AVPacket
	int ret = av_read_frame(mIfmtCtx, &pkt);
	if (ret < 0)
		return -1;
	in_stream = mIfmtCtx->streams[pkt.stream_index];
	/* copy packet */
	//Convert PTS/DTS
	pkt.pts = av_rescale_q_rnd(pkt.pts, in_stream->time_base, in_stream->time_base, 
		(AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
	pkt.dts = av_rescale_q_rnd(pkt.dts, in_stream->time_base, in_stream->time_base, 
		(AVRounding)(AV_ROUND_NEAR_INF | AV_ROUND_PASS_MINMAX));
	pkt.duration = av_rescale_q(pkt.duration, in_stream->time_base, in_stream->time_base);
	pkt.pos = -1;
	//Print to Screen
	if (drop < 100) {
		drop++;
		goto end;
	}
	if (pkt.stream_index == mVideoIndex) {
		printf("Receive %8d video frames from input URL\n", mFrameIndex);
		mFrameIndex++;
		av_bitstream_filter_filter(mH264bsfc, in_stream->codec, NULL,
			&pkt.data, &pkt.size, pkt.data, pkt.size, 0);
		if (mObserver.size() > 0) {
			for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) {
				RtmpPullObserver *p = (RtmpPullObserver *)*itr;
				if (p->mObserverType == RtmpPullObserver::Observer_Video) {
					p->OnRtmpFrame(pkt.data, pkt.size);
				}
			}
		}
	}
	if (pkt.stream_index == mAudioIndex) {
		if (mObserver.size() > 0) {
			for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) {
				RtmpPullObserver *p = (RtmpPullObserver *)*itr;
				if (p->mObserverType == RtmpPullObserver::Observer_Audio) {
					p->OnRtmpFrame(pkt.data, pkt.size);
				}
			}
		}
	}
end:
	//printf("%02x %02x %02x %02x %02x\r\n", pkt.data[0], pkt.data[1], pkt.data[2], pkt.data[3], pkt.data[4]);
	av_free_packet(&pkt);
}

int RtmpPuller::SetObserver(RtmpPullObserver *p)
{
	if (nullptr == p)
		return -1;
	mMux.lock();
	for (auto itr = this->mObserver.begin(); itr != mObserver.end(); itr++) {
		if (p == *itr) return 0;
	}
	this->mObserver.push_back(p);
	mMux.unlock();
	return 0;
}

RtmpPuller::CAP_STATUS RtmpPuller::Status()
{
	return this->mStatus;
}

AVStream * RtmpPuller::AudioStream()
{
	return this->mAudioStream;
}
